qpid 是符合AMQP规范的apache 许可证的消息中间件,目前在openstack中作为一种可选的消息中间件服务配置,其他还有rabbitmq和zeroMQ
如果看过qpid的编程API文档的话,会看到比较简单的一个例子, 如下(接收消息打印消息内容)
1 | #following code is from |
connection和session是1对多的关系,每个session保证消息的顺序接收,让session创建对应的sender和receiver,这里我们只需要创建receiver
这个程序看起来很好,如果直接传一个地址, 比如(我们想要接收openstack glance的message)
运行如下:
python drain.py -b admin/qpid@localhost glance
如果我们想要实现连接断掉后重新自动连接,我们可以传入参数 -r
上面的程序有个问题,如果没有收到消息就断开连接了。
新需求1: 我们要实现持续的监听接收消息, 改一下
1 | try: |
这样就可以了。
还不行,
新需求2:我们要实现断后重新自动连接, 可以, 传入参数 -r,
解决了
问题出现了,你会发现在service qpidd restart后,程序会抛出exception
qpid.messaging.exceptions.NotFound: no such queue: glance
新需求3:显然这是由于我们创建的queue不是durable的,所以需要在qpid restart后能够正常运行,而不是退出。
这就需要我们创建的queue能够在接收到消息的时候自动创建完成,
解决方法: rcv = ssn.receiver(addr + “; {create: always}”)
这样就完成queue的按需创建
新需求4: qpid 默认的reconnect上面的是基于固定interval,我们想改变重新建立连接的算法,实现2的指数式建立连接
解决方法: 我们写入自己的自动重连方法, 一个简单的例子如下:
1 | def reconnect(): |
这样就OK了
总结: 经过上面的需求变化,我们新的接收消息程序如下:
1 | import optparse |